進到 [數據分析實作一] Step 3,今天要做的是框起來的地方,我們要把檔案從 Cloud storage
存到 BigQuery,方便後續分析使用。
cloud shell
code
先把程式碼 clone 下來:
git clone https://github.com/xscapex/BigQuery_ITHOME.git
切換到專案目錄底下:
cd ./BigQuery_ITHOME/Day_17
python create_bq_dataset.py
python create_bq_table.py
python gcs_to_bq.py
我們到 BigQuery 的面面即可看到:
create_bq_dataset.py:
# [START bigquery_create_dataset]
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set dataset_id to the ID of the dataset to create.
dataset_id = "ithome-bq-test.tv_shows".format(client.project)
# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)
# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"
# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
# [END bigquery_create_dataset]
create_bq_table.py:
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "ithome-bq-test.tv_shows.tv_shows_dashboard"
schema = [
bigquery.SchemaField("ID", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
bigquery.SchemaField("YEAR", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("AGE", "STRING", mode="REQUIRED"),
bigquery.SchemaField("NETFLIX", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("Hulu", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("Prime_video", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("Disney", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("Type", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("IMDb_score", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("IMDb_Total", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("RT_score", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("RT_Total", "INTEGER", mode="REQUIRED"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
gcs_to_bq.py
from google.cloud import bigquery
import glob
import os
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = glob.glob("*.json")[0]
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "ithome-bq-test.tv_shows.tv_shows_dashboard"
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
)
uri = "gs://ithome-bq-test/mysql_export/tv_shows_dashboard.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
從 Cloud storage 傳資料到 BigQuery 的執行細節:
Creating datasets
Create and use tables